Feat/cron#913
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a new unified memory and session architecture, adding a pluggable MemoryBackend system (with adapters for file-based, ReMe, mem0, mempalace, ByteRover, and Supermemory storage) and a non-destructive ContextAssembler for message history compaction. It also introduces a standalone CronService and CronTool for managing scheduled agent tasks. The review comments correctly identify critical issues, including potential CPU exhaustion/hangs in the cron schedule catch-up logic, platform compatibility issues with os.fork on Windows, and blocking synchronous thread joins (self._sync_thread.join) that degrade the performance of the asynchronous event loop in the Supermemory and ByteRover adapters.
| def _background_add(self, content: str) -> None: | ||
| """Add a memory document in a background thread.""" | ||
| if self._sync_thread and self._sync_thread.is_alive(): | ||
| self._sync_thread.join(timeout=2.0) | ||
|
|
||
| def _add(): | ||
| try: | ||
| self._client.documents.add( | ||
| content=content, | ||
| container_tags=[self._container_tag], | ||
| entity_context=self._entity_context, | ||
| metadata={ | ||
| "source": "ms-agent", | ||
| "type": "conversation_turn", | ||
| }, | ||
| ) | ||
| except Exception as e: | ||
| logger.debug("[supermemory_backend] add failed: %s", e) |
There was a problem hiding this comment.
Calling self._sync_thread.join(timeout=2.0) synchronously inside the main thread blocks the asyncio event loop, which can severely degrade the responsiveness of the asynchronous agent framework. Since this is a fire-and-forget background operation, consider running it completely asynchronously using loop.run_in_executor to avoid blocking the event loop.
def _background_add(self, content: str) -> None:
"""Add a memory document in a background thread."""
def _add():
try:
self._client.documents.add(
content=content,
container_tags=[self._container_tag],
entity_context=self._entity_context,
metadata={
"source": "ms-agent",
"type": "conversation_turn",
},
)
except Exception as e:
logger.debug("[supermemory_backend] add failed: %s", e)
import asyncio
try:
loop = asyncio.get_running_loop()
loop.run_in_executor(None, _add)
except RuntimeError:
self._sync_thread = threading.Thread(target=_add, daemon=True, name="supermemory-add")
self._sync_thread.start()| def _background_curate(self, content: str, wait: bool = False) -> None: | ||
| """Run ``brv curate`` in a background thread.""" | ||
| if self._sync_thread and self._sync_thread.is_alive(): | ||
| self._sync_thread.join(timeout=5.0) | ||
|
|
||
| def _curate(): | ||
| try: | ||
| _run_brv( | ||
| ["curate", "--", content], | ||
| timeout=self._curate_timeout, cwd=self._cwd, | ||
| ) | ||
| except Exception as e: | ||
| logger.debug("[byterover_backend] curate failed: %s", e) | ||
|
|
||
| self._sync_thread = threading.Thread( | ||
| target=_curate, daemon=True, name="brv-curate") | ||
| self._sync_thread.start() | ||
|
|
||
| if wait: | ||
| self._sync_thread.join(timeout=float(self._curate_timeout)) |
There was a problem hiding this comment.
Calling self._sync_thread.join(timeout=5.0) synchronously inside the main thread blocks the asyncio event loop, which can severely degrade the responsiveness of the asynchronous agent framework. Since this is a fire-and-forget background operation when wait is False, consider running it completely asynchronously using loop.run_in_executor to avoid blocking the event loop.
def _background_curate(self, content: str, wait: bool = False) -> None:
"""Run ``brv curate`` in a background thread."""
def _curate():
try:
_run_brv(
["curate", "--", content],
timeout=self._curate_timeout, cwd=self._cwd,
)
except Exception as e:
logger.debug("[byterover_backend] curate failed: %s", e)
if wait:
_curate()
else:
import asyncio
try:
loop = asyncio.get_running_loop()
loop.run_in_executor(None, _curate)
except RuntimeError:
self._sync_thread = threading.Thread(target=_curate, daemon=True, name="brv-curate")
self._sync_thread.start()| task.add_done_callback(self._background_tasks.discard) | ||
|
|
||
| async def _execute_job(self, job: CronJobSpec, state: CronJobState) -> None: | ||
| self._manager.mark_running(job.id) |
There was a problem hiding this comment.
_on_due_jobs 用 create_task 异步执行,但 mark_running 在 _execute_job 里才调用:
service.py
Lines 251-266
async def _on_due_jobs(self, due: ...):
for job, state in due:
task = asyncio.create_task(self._execute_job(job, state), ...)
# 立即返回,不 await
async def _execute_job(self, job, state):
self._manager.mark_running(job.id) # 晚于 create_task
在 mark_running 写入前,状态仍是 scheduled,下一次 tick 可能再次判定 due → 同一 job 并行跑两次。
建议:在 create_task 之前同步 mark_running(job.id),或 due 判定时用内存级 in-flight set。
There was a problem hiding this comment.
asyncio 单线程模型下,FIFO 顺序保证 _execute_job 先于 _sleep_and_tick 开始执行,到第一个 await 前是同步操作,因此会先完成。
| schedule_str=schedule, | ||
| prompt=prompt, | ||
| name=args.get('name', ''), | ||
| ) |
There was a problem hiding this comment.
没有 project / workflow 。CLI cron create --project 有project,这里没有。是符合预期的吗
There was a problem hiding this comment.
这是当时简单起见的预留,因为路径解析会引入相对复杂的讨论,现在加上了。
| for job, state in jobs_and_states: | ||
| if not job.enabled: | ||
| continue | ||
| if state.status == 'paused': |
There was a problem hiding this comment.
manual_tick 未跳过 running/completed ,这是符合预期的吗
Change Summary
Add support for cron as a CLI, tool, and service.
Related issue number
Checklist
pre-commit installandpre-commit run --all-filesbefore git commit, and passed lint check.